/*-------------------------------------------------------------------------
 *
 * deparse_table_stmts.c
 *	  All routines to deparse table statements.
 *	  This file contains all entry points specific for table statement deparsing as well
 *as functions that are currently only used for deparsing of the table statements.
 *
 * Copyright (c) Citus Data, Inc.
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "catalog/heap.h"
#include "commands/defrem.h"
#include "commands/tablecmds.h"
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "parser/parse_expr.h"
#include "parser/parse_relation.h"
#include "parser/parse_type.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#ifdef DISABLE_OG_COMMENTS
#include "utils/ruleutils.h"
#endif
#include "distributed/commands.h"
#include "distributed/deparser.h"
#include "distributed/namespace_utils.h"
#include "distributed/version_compat.h"

static void AppendAlterTableSchemaStmt(StringInfo buf, AlterObjectSchemaStmt* stmt);
static void AppendAlterTableStmt(StringInfo buf, AlterTableStmt* stmt);
static void AppendAlterTableCmd(StringInfo buf, AlterTableCmd* alterTableCmd,
                                AlterTableStmt* stmt);
static void AppendAlterTableCmdAddColumn(StringInfo buf, AlterTableCmd* alterTableCmd,
                                         AlterTableStmt* stmt);
static void AppendAlterTableCmdDropConstraint(StringInfo buf,
                                              AlterTableCmd* alterTableCmd);

char* DeparseAlterTableSchemaStmt(Node* node)
{
    AlterObjectSchemaStmt* stmt = castNode(AlterObjectSchemaStmt, node);
    StringInfoData str = {0};
    initStringInfo(&str);

    Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);

    AppendAlterTableSchemaStmt(&str, stmt);
    return str.data;
}

static void AppendAlterTableSchemaStmt(StringInfo buf, AlterObjectSchemaStmt* stmt)
{
    Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);

    bool isForeignTable = stmt->objectType == OBJECT_FOREIGN_TABLE;
    appendStringInfo(buf, "ALTER %sTABLE ", isForeignTable ? "FOREIGN " : "");
    if (stmt->missing_ok) {
        appendStringInfo(buf, "IF EXISTS ");
    }
    char* tableName =
        quote_qualified_identifier(stmt->relation->schemaname, stmt->relation->relname);
    const char* newSchemaName = quote_identifier(stmt->newschema);
    appendStringInfo(buf, "%s SET SCHEMA %s;", tableName, newSchemaName);
}

/*
 * DeparseAlterTableStmt builds and returns a string representing the
 * AlterTableStmt where the object acted upon is of kind OBJECT_TABLE
 */
char* DeparseAlterTableStmt(Node* node)
{
    AlterTableStmt* stmt = castNode(AlterTableStmt, node);
    StringInfoData str = {0};
    initStringInfo(&str);

    Assert(stmt->relkind == OBJECT_TABLE);

    AppendAlterTableStmt(&str, stmt);
    return str.data;
}

/*
 * AppendAlterTableStmt builds and returns an SQL command representing an
 * ALTER TABLE statement from given AlterTableStmt object where the object
 * acted upon is of kind OBJECT_TABLE
 */
static void AppendAlterTableStmt(StringInfo buf, AlterTableStmt* stmt)
{
    const char* identifier =
        quote_qualified_identifier(stmt->relation->schemaname, stmt->relation->relname);
    ListCell* cmdCell = NULL;

    Assert(stmt->relkind == OBJECT_TABLE);

    appendStringInfo(buf, "ALTER TABLE %s", identifier);
    foreach (cmdCell, stmt->cmds) {
        if (cmdCell != list_head(stmt->cmds)) {
            appendStringInfoString(buf, ", ");
        }

        AlterTableCmd* alterTableCmd = castNode(AlterTableCmd, lfirst(cmdCell));
        AppendAlterTableCmd(buf, alterTableCmd, stmt);
    }

    appendStringInfoString(buf, ";");
}

/*
 * AppendColumnNameList converts a list of columns into comma separated string format
 * (colname_1, colname_2, .., colname_n).
 */
void AppendColumnNameList(StringInfo buf, List* columns)
{
    appendStringInfoString(buf, " (");

    ListCell* lc;
    bool firstkey = true;

    foreach (lc, columns) {
        if (firstkey == false) {
            appendStringInfoString(buf, ", ");
        }

        appendStringInfo(buf, "%s", quote_identifier(strVal(lfirst(lc))));
        firstkey = false;
    }

    appendStringInfoString(buf, " )");
}

/*
 * AppendAlterTableCmdConstraint builds a string required to create given
 * constraint as part of an ADD CONSTRAINT or an ADD COLUMN subcommand,
 * and appends it to the buf.
 */
static void AppendAlterTableCmdConstraint(StringInfo buf, Constraint* constraint,
                                          AlterTableStmt* stmt, AlterTableType subtype)
{
    if (subtype != AT_AddConstraint && subtype != AT_AddColumn) {
        ereport(ERROR, (errmsg("Unsupported alter table subtype: %d", (int)subtype)));
    }

    /* Need to deparse the alter table constraint command only if we are adding a
     * constraint name.*/
    if (constraint->conname == NULL) {
        ereport(
            ERROR,
            (errmsg("Constraint name can not be NULL when deparsing the constraint.")));
    }

    if (subtype == AT_AddConstraint) {
        appendStringInfoString(buf, " ADD CONSTRAINT ");
    } else {
        appendStringInfoString(buf, " CONSTRAINT ");
    }

    appendStringInfo(buf, "%s ", quote_identifier(constraint->conname));

    /* postgres version >= PG15
     * UNIQUE [ NULLS [ NOT ] DISTINCT ] ( column_name [, ... ] ) [ INCLUDE ( column_name
     * [, ...]) ] postgres version < PG15 UNIQUE ( column_name [, ... ] ) [ INCLUDE (
     * column_name [, ...]) ] PRIMARY KEY ( column_name [, ... ] ) [ INCLUDE ( column_name
     * [, ...]) ]
     */
    if (constraint->contype == CONSTR_PRIMARY || constraint->contype == CONSTR_UNIQUE) {
        if (constraint->contype == CONSTR_PRIMARY) {
            appendStringInfoString(buf, " PRIMARY KEY ");
        } else {
            appendStringInfoString(buf, " UNIQUE");
#ifdef DISABLE_OG_COMMENTS
            if (constraint->nulls_not_distinct == true) {
                appendStringInfoString(buf, " NULLS NOT DISTINCT");
#endif
            }

            if (subtype == AT_AddConstraint) {
                AppendColumnNameList(buf, constraint->keys);
            }

            if (constraint->including != NULL) {
                appendStringInfoString(buf, " INCLUDE ");

                AppendColumnNameList(buf, constraint->including);
            }

            if (constraint->options != NIL) {
                appendStringInfoString(buf, " WITH(");

                ListCell* defListCell;
                foreach (defListCell, constraint->options) {
                    DefElem* def = (DefElem*)lfirst(defListCell);

                    bool first = (defListCell == list_head(constraint->options));
                    appendStringInfo(buf, "%s%s=%s", first ? "" : ",",
                                     quote_identifier(def->defname),
                                     quote_literal_cstr(defGetString(def)));
                }

                appendStringInfoChar(buf, ')');
            }
        }
        else if (constraint->contype == CONSTR_EXCLUSION)
        {
            /*
             * This block constructs the EXCLUDE clause which is in the following form:
             * EXCLUDE [ USING index_method ] ( exclude_element WITH operator [, ... ] )
             */
            appendStringInfoString(buf, " EXCLUDE ");

            if (constraint->access_method != NULL) {
                appendStringInfoString(buf, "USING ");
                appendStringInfo(buf, "%s ", quote_identifier(constraint->access_method));
            }

            appendStringInfoString(buf, " (");

            ListCell* lc;
            bool firstOp = true;

            foreach (lc, constraint->exclusions) {
                List* pair = (List*)lfirst(lc);

                Assert(list_length(pair) == 2);
                IndexElem* elem = linitial_node(IndexElem, pair);
                List* opname = lsecond_node(List, pair);
                if (firstOp == false) {
                    appendStringInfoString(buf, " ,");
                }

                ListCell* lc2;

                foreach (lc2, opname) {
                    appendStringInfo(buf, "%s WITH %s", quote_identifier(elem->name),
                                     strVal(lfirst(lc2)));
                }

                firstOp = false;
            }

            appendStringInfoString(buf, " )");
        }
        else if (constraint->contype == CONSTR_CHECK)
        {
            if (subtype == AT_AddColumn) {
                /*
                 * Preprocess should've rejected deparsing such an ALTER TABLE
                 * command but be on the safe side.
                 */
                ereport(ERROR, (errmsg("cannot add check constraint to column by "
                                       "using ADD COLUMN command"),
                                errhint("Consider using ALTER TABLE ... ADD CONSTRAINT "
                                        "... CHECK command after adding the column")));
            }

            LOCKMODE lockmode = AlterTableGetLockLevel(stmt->cmds);
            Oid leftRelationId = AlterTableLookupRelation(stmt, lockmode);

            /* To be able to use deparse_expression function, which creates an expression
             * string, the expression should be provided in its cooked form. We transform
             * the raw expression to cooked form.
             */
            ParseState* pstate = make_parsestate(NULL);
            Relation relation = table_open(leftRelationId, AccessShareLock);

            /* Add table name to the name space in  parse state. Otherwise column names
             * cannot be found.
             */
            AddRangeTableEntryToQueryCompat(pstate, relation);

            Node* exprCooked = transformExpr(pstate, constraint->raw_expr,

                                             EXPR_KIND_CHECK_CONSTRAINT);

            char* relationName = get_rel_name(leftRelationId);
            List* relationCtx = deparse_context_for(relationName, leftRelationId);

            char* exprSql = deparse_expression(exprCooked, relationCtx, false, false);

            relation_close(relation, NoLock);

            appendStringInfo(buf, " CHECK (%s)", exprSql);

            if (constraint->is_no_inherit) {
                appendStringInfo(buf, " NO INHERIT");
            }
        }
        else if (constraint->contype == CONSTR_FOREIGN)
        {
            if (subtype == AT_AddConstraint) {
                appendStringInfoString(buf, " FOREIGN KEY");

                AppendColumnNameList(buf, constraint->fk_attrs);
            }

            appendStringInfoString(buf, " REFERENCES");

            appendStringInfo(buf, " %s",
                             quote_qualified_identifier(constraint->pktable->schemaname,
                                                        constraint->pktable->relname));

            if (list_length(constraint->pk_attrs) > 0) {
                AppendColumnNameList(buf, constraint->pk_attrs);
            }

            /* Append supported options if provided */

            /* FKCONSTR_MATCH_SIMPLE is default. Append matchtype if not default */
            if (constraint->fk_matchtype == FKCONSTR_MATCH_FULL) {
                appendStringInfoString(buf, " MATCH FULL");
            }

            switch (constraint->fk_del_action) {
                case FKCONSTR_ACTION_SETDEFAULT: {
                    appendStringInfoString(buf, " ON DELETE SET DEFAULT");
                    break;
                }

                case FKCONSTR_ACTION_SETNULL: {
                    appendStringInfoString(buf, " ON DELETE SET NULL");
                    break;
                }

                case FKCONSTR_ACTION_NOACTION: {
                    appendStringInfoString(buf, " ON DELETE NO ACTION");
                    break;
                }

                case FKCONSTR_ACTION_RESTRICT: {
                    appendStringInfoString(buf, " ON DELETE RESTRICT");
                    break;
                }

                case FKCONSTR_ACTION_CASCADE: {
                    appendStringInfoString(buf, " ON DELETE CASCADE");
                    break;
                }

                default: {
                    elog(ERROR, "unsupported FK delete action type: %d",
                         (int)constraint->fk_del_action);
                    break;
                }
            }

            switch (constraint->fk_upd_action) {
                case FKCONSTR_ACTION_SETDEFAULT: {
                    appendStringInfoString(buf, " ON UPDATE SET DEFAULT");
                    break;
                }

                case FKCONSTR_ACTION_SETNULL: {
                    appendStringInfoString(buf, " ON UPDATE SET NULL");
                    break;
                }

                case FKCONSTR_ACTION_NOACTION: {
                    appendStringInfoString(buf, " ON UPDATE NO ACTION");
                    break;
                }

                case FKCONSTR_ACTION_RESTRICT: {
                    appendStringInfoString(buf, " ON UPDATE RESTRICT");
                    break;
                }

                case FKCONSTR_ACTION_CASCADE: {
                    appendStringInfoString(buf, " ON UPDATE CASCADE");
                    break;
                }

                default: {
                    elog(ERROR, "unsupported FK update action type: %d",
                         (int)constraint->fk_upd_action);
                    break;
                }
            }
        }

        /*
         * For ADD CONSTRAINT subcommand, FOREIGN KEY and CHECK constraints migth
         * have NOT VALID option.
         *
         * Note that skip_validation might be true for an ADD COLUMN too but this
         * is not because Postgres supports this but because Citus sets this flag
         * to true for foreign key constraints added via ADD COLUMN. So we don't
         * check for skip_validation for ADD COLUMN subcommand.
         */
        if (subtype == AT_AddConstraint && constraint->skip_validation) {
            appendStringInfoString(buf, " NOT VALID ");
        }

        if (subtype == AT_AddColumn &&
            (constraint->deferrable || constraint->initdeferred)) {
            /*
             * For ADD COLUMN subcommand, the fact that whether given constraint
             * is deferrable or initially deferred is indicated by another Constraint
             * object, not via deferrable / initdeferred fields.
             */
            ereport(ERROR, (errmsg("unexpected value set for deferrable/initdeferred "
                                   "field for an ADD COLUMN subcommand")));
        }

        if (constraint->deferrable) {
            appendStringInfoString(buf, " DEFERRABLE");

            if (constraint->initdeferred) {
                appendStringInfoString(buf, " INITIALLY DEFERRED");
            }
        }
    }

    /*
     * AppendAlterTableCmd builds and appends to the given buffer a command
     * from given AlterTableCmd object. Currently supported commands are of type
     * AT_AddColumn, AT_SetNotNull and AT_AddConstraint {PRIMARY KEY, UNIQUE, EXCLUDE}.
     */
    static void AppendAlterTableCmd(StringInfo buf, AlterTableCmd * alterTableCmd,
                                    AlterTableStmt * stmt)
    {
        switch (alterTableCmd->subtype) {
            case AT_AddColumn: {
                AppendAlterTableCmdAddColumn(buf, alterTableCmd, stmt);
                break;
            }

            case AT_DropConstraint: {
                AppendAlterTableCmdDropConstraint(buf, alterTableCmd);
                break;
            }

            case AT_AddConstraint: {
                Constraint* constraint = (Constraint*)alterTableCmd->def;

                /* We need to deparse ALTER TABLE ... ADD {PRIMARY KEY, UNIQUE, EXCLUSION}
                 * commands into ALTER TABLE ... ADD CONSTRAINT <conname> {PRIMARY KEY,
                 * UNIQUE, EXCLUSION} ... format to be able add a constraint name.
                 */
                if (ConstrTypeCitusCanDefaultName(constraint->contype)) {
                    AppendAlterTableCmdConstraint(buf, constraint, stmt,
                                                  AT_AddConstraint);
                    break;
                }
            }

                /* fallthrough */

            default: {
                ereport(ERROR,
                        (errmsg("unsupported subtype for alter table command"),
                         errdetail("sub command type: %d", alterTableCmd->subtype)));
            }
        }
    }

    /*
     * GeneratedWhenStr returns the char representation of given generated_when
     * value.
     */
    static const char* GeneratedWhenStr(char generatedWhen)
    {
        switch (generatedWhen) {
            case 'a': {
                return "ALWAYS";
            }

            case 'd': {
                return "BY DEFAULT";
            }

            default:
                ereport(ERROR,
                        (errmsg("unrecognized generated_when: %d", generatedWhen)));
        }

        return nullptr;
    }

    /*
     * DeparseRawExprForColumnDefault returns string representation of given
     * rawExpr based on given column type information.
     */
    static char* DeparseRawExprForColumnDefault(Oid relationId, Oid columnTypeId,
                                                int32 columnTypeMod, char* columnName,
                                                char attgenerated, Node* rawExpr)
    {
        ParseState* pstate = make_parsestate(NULL);
        Relation relation = RelationIdGetRelation(relationId);
        auto typeTuple = (Form_pg_type)GETSTRUCT(typeidType(columnTypeId));
        AddRangeTableEntryToQueryCompat(pstate, relation);

        Node* defaultExpr =
            cookDefault(pstate, rawExpr, columnTypeId, columnTypeMod,
                        typeTuple->typcollation, columnName, attgenerated);

        List* deparseContext = deparse_context_for(get_rel_name(relationId), relationId);

        int saveNestLevel = PushEmptySearchPath();
        char* defaultExprStr =
            deparse_expression(defaultExpr, deparseContext, false, false);
        PopEmptySearchPath(saveNestLevel);

        RelationClose(relation);

        return defaultExprStr;
    }

    /*
     * AppendAlterTableCmd builds and appends to the given buffer an AT_AddColumn command
     * from given AlterTableCmd object in the form ADD COLUMN ...
     */
    static void AppendAlterTableCmdAddColumn(
        StringInfo buf, AlterTableCmd * alterTableCmd, AlterTableStmt * stmt)
    {
        Assert(alterTableCmd->subtype == AT_AddColumn);

        Oid relationId = AlterTableLookupRelation(stmt, NoLock);

        appendStringInfoString(buf, " ADD COLUMN ");

        if (alterTableCmd->missing_ok) {
            appendStringInfoString(buf, "IF NOT EXISTS ");
        }

        ColumnDef* columnDefinition = (ColumnDef*)alterTableCmd->def;

        appendStringInfo(buf, "%s ", quote_identifier(columnDefinition->colname));

        int32 typmod = 0;
        Oid typeOid = InvalidOid;
        bits16 formatFlags = FORMAT_TYPE_TYPEMOD_GIVEN | FORMAT_TYPE_FORCE_QUALIFY;
        typenameTypeIdAndMod(NULL, columnDefinition->typname, &typeOid, &typmod);
        appendStringInfo(buf, "%s", format_type_extended(typeOid, typmod, formatFlags));

        if (columnDefinition->cmprs_mode != ATT_CMPR_UNDEFINED) {
            // add the compress mode for this column
            switch (columnDefinition->cmprs_mode) {
                case ATT_CMPR_NOCOMPRESS:
                    appendStringInfoString(buf, " NOCOMPRESS ");
                    break;
                case ATT_CMPR_DELTA:
                    appendStringInfoString(buf, " DELTA ");
                    break;
                case ATT_CMPR_DICTIONARY:
                    appendStringInfoString(buf, " DICTIONARY ");
                    break;
                case ATT_CMPR_PREFIX:
                    appendStringInfoString(buf, " PREFIX ");
                    break;
                case ATT_CMPR_NUMSTR:
                    appendStringInfoString(buf, " NUMSTR ");
                    break;
                default:
                    // do nothing
                    break;
            }
        }

        Oid collationOid = GetColumnDefCollation(NULL, columnDefinition, typeOid);
        if (OidIsValid(collationOid)) {
            const char* identifier = FormatCollateBEQualified(collationOid);
            appendStringInfo(buf, " COLLATE %s", identifier);
        }

        ListCell* constraintCell = NULL;
        foreach (constraintCell, columnDefinition->constraints) {
            Constraint* constraint = (Constraint*)lfirst(constraintCell);

            if (constraint->contype == CONSTR_NOTNULL) {
                appendStringInfoString(buf, " NOT NULL");
            } else if (constraint->contype == CONSTR_NULL) {
                appendStringInfoString(buf, " NULL");
            } else if (constraint->contype == CONSTR_DEFAULT) {
                char attgenerated = '\0';
                appendStringInfo(
                    buf, " DEFAULT %s",
                    DeparseRawExprForColumnDefault(relationId, typeOid, typmod,
                                                   columnDefinition->colname,
                                                   attgenerated, constraint->raw_expr));
            }
#ifdef DISABLE_OG_COMMENTS
            else if (constraint->contype == CONSTR_IDENTITY) {
                /*
                 * Citus doesn't support adding identity columns via ALTER TABLE,
                 * so we don't bother teaching the deparser about them.
                 */
                ereport(ERROR, (errmsg("unexpectedly found identity column "
                                       "definition in ALTER TABLE command")));
            }
#endif
            else if (constraint->contype == CONSTR_GENERATED) {
                char attgenerated = 's';
                appendStringInfo(
                    buf, " GENERATED %s AS (%s) STORED",
                    GeneratedWhenStr(constraint->generated_when),
                    DeparseRawExprForColumnDefault(relationId, typeOid, typmod,
                                                   columnDefinition->colname,
                                                   attgenerated, constraint->raw_expr));
            } else if (constraint->contype == CONSTR_CHECK ||
                       constraint->contype == CONSTR_PRIMARY ||
                       constraint->contype == CONSTR_UNIQUE ||
                       constraint->contype == CONSTR_EXCLUSION ||
                       constraint->contype == CONSTR_FOREIGN) {
                AppendAlterTableCmdConstraint(buf, constraint, stmt, AT_AddColumn);
            } else if (constraint->contype == CONSTR_ATTR_DEFERRABLE) {
                appendStringInfoString(buf, " DEFERRABLE");
            } else if (constraint->contype == CONSTR_ATTR_NOT_DEFERRABLE) {
                appendStringInfoString(buf, " NOT DEFERRABLE");
            } else if (constraint->contype == CONSTR_ATTR_DEFERRED) {
                appendStringInfoString(buf, " INITIALLY DEFERRED");
            } else if (constraint->contype == CONSTR_ATTR_IMMEDIATE) {
                appendStringInfoString(buf, " INITIALLY IMMEDIATE");
            } else {
                ereport(ERROR, (errmsg("unsupported constraint type"),
                                errdetail("constraint type: %d", constraint->contype)));
            }
        }
    }

    /*
     * AppendAlterTableCmdDropConstraint builds and appends to the given buffer an
     * AT_DropConstraint command from given AlterTableCmd object in the form
     * DROP CONSTRAINT ...
     */
    static void AppendAlterTableCmdDropConstraint(StringInfo buf,
                                                  AlterTableCmd * alterTableCmd)
    {
        appendStringInfoString(buf, " DROP CONSTRAINT");

        if (alterTableCmd->missing_ok) {
            appendStringInfoString(buf, " IF EXISTS");
        }

        appendStringInfo(buf, " %s", quote_identifier(alterTableCmd->name));

        if (alterTableCmd->behavior == DROP_CASCADE) {
            appendStringInfoString(buf, " CASCADE");
        }
    }
